package com.bawei.persona.realtime.app.func;

import com.alibaba.fastjson.JSONObject;

import com.bawei.persona.realtime.bean.KafkaBean;
import com.bawei.persona.realtime.bean.TableProcess;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.ArrayList;
import java.util.List;


/**
 * 上海大数据学院
 * 项目规划及管理：李剑
 * 技术指导及需求分析：郭洵
 * 编程：楚志高
 *
 * @author bawei  bigdata sh
 * @since 2021-06-11
 * 将来根据不同的数据类型是kafka 还是hbadse 进行数据的不同输出
 */
public class TableProcessToHbaseOrKafkaFunction  extends ProcessFunction<JSONObject, KafkaBean> {

    //因为要将维度数据通过侧输出流输出，所以我们在这里定义一个侧输出流标记
    private OutputTag<KafkaBean> outputTag;
//
 private   List<TableProcess> tableProcesses = new ArrayList<TableProcess>() ;

    //初始构造方法
    public TableProcessToHbaseOrKafkaFunction(OutputTag<KafkaBean> outputTag) {
        this.outputTag = outputTag;
        //初始化数据库中的需要做的执行流程表记录

    }
 // 数据开始启动将数据库表中的数据查询出来
    @Override
    public void open(Configuration parameters) throws Exception {

        ExecutionEnvironment staticEnv = ExecutionEnvironment.getExecutionEnvironment();
        //将数据库中的需要定期执行的流程表执行出来
        List<TableProcess> tableProcesses = jdbcRead(staticEnv);


    }

    @Override
    public void processElement(JSONObject jsonObject, Context context, Collector<KafkaBean> out) throws Exception {

        String tablename = jsonObject.get("table").toString();
        String type1 = jsonObject.get("type").toString();
        String data = jsonObject.get("data").toString();
        //根据得到的结果去进行不同的操作。 如果是
        //简单处理，根据其中数据库中的表名进行处理



        for (int i = 0; i <  tableProcesses.size(); i++) {
            //得动数据库表中每一条记录，循环比较
            TableProcess tableProcess = tableProcesses.get(i);
            // 如果得到的数据是来源与hbase表中需要保存到dim 中去，需要保存在测输出流
            //外加一个条件，对应的表的名字相等
            if(tableProcess.getSinkType().equals("hbase") && tableProcess.getSourceTable().equals(tablename) ){
               // 这里需要注意，将来的数据往hbase中写的时候需要动态的去将hbase中的列匹配解析data 的里面的数据
                //所以需要把输出的数据的列族里面的列值 动态解析
                KafkaBean kafkaBean = new KafkaBean();
                //必须的，将来保存所有的列族的value
                kafkaBean.setData(data);
                kafkaBean.setType(type1);
                kafkaBean.setTablename(tablename);
                //将来对应的hbase表中的数据也要匹配
                kafkaBean.setSinkTable(tableProcess.getSinkTable());
                //将来需要给写入到hbase表中的数据字段
                kafkaBean.setSinkcolumn(tableProcess.getSinkColumns());
                //将插入hbase 中的数据输出到hbase的测输出流，将来提供给被调用程序进行数据插入
                context.output(outputTag, kafkaBean);
            }
            //将数据从正流中输出得到结果
            if(tableProcess.getSinkType().equals("kafka")){

                KafkaBean kafkaBean = new KafkaBean();
                kafkaBean.setData(data);
                kafkaBean.setType(type1);
                kafkaBean.setTablename(tablename);
                out.collect(kafkaBean);
            }
        }



    }

    /*
     分装方法： 将设置配置好的东西取出来，将来按着这个配置将数据进行分流
    */
    public  List<TableProcess> jdbcRead(ExecutionEnvironment env) throws Exception {
        DataSet<Row> rowDataSet = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
                //配置数据库连接信息
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://hadoop101:3306/gmall2021")
                .setUsername("root")
                .setPassword("000000")
                .setQuery("select * from table_process")
                //设置查询的列的类型，根据实际情况定
                .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
                .finish());
        List<Row> list = rowDataSet.collect();
        List<TableProcess> listNew = new ArrayList<TableProcess>();
        for (int i = 0; i <  list.size(); i++) {
            Row row = list.get(i);
            String source_table = row.getField(0).toString();
            String operate_type = row.getField(1).toString();
            String sink_type = row.getField(2).toString();
            String sink_table = row.getField(3).toString();
            String sink_columns = row.getField(4).toString();
            String sink_pk = row.getField(5)==null ?"" :row.getField(5).toString();
            String sink_extend = row.getField(6)==null ?"":row.getField(6) .toString();
            TableProcess tableProcess = new TableProcess() ;
            tableProcess.setSourceTable(source_table);
            tableProcess.setOperateType(operate_type);
            tableProcess.setSinkType(sink_type);
            tableProcess.setSinkTable(sink_table);
            tableProcess.setSinkColumns(sink_columns);
            tableProcess.setSinkPk(sink_pk);
            tableProcess.setSinkExtend(sink_extend);
            listNew.add(tableProcess) ;
        }
        return  listNew;
    }

}
